package flink.examples;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class SplitStreamByOutputTag {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator<WaterSensor> ds =
                env.socketTextStream("hadoop102", 7777)
                        .map(new WaterSensorMapFunction());
        OutputTag<WaterSensor> s1 = new OutputTag("s1",
                Types.POJO(WaterSensor.class)){};
        OutputTag<WaterSensor> s2 = new OutputTag("s2",
                Types.POJO(WaterSensor.class)){};
//返回的都是主流
        SingleOutputStreamOperator<WaterSensor> ds1 =
                ds.process(new ProcessFunction<WaterSensor, WaterSensor>()
                {
                    @Override
                    public void processElement(WaterSensor value, Context
                            ctx, Collector<WaterSensor> out) throws Exception {
                        if ("s1".equals(value.getId())) {
                            ctx.output(s1, value);
                        } else if ("s2".equals(value.getId())) {
                            ctx.output(s2, value);
                        } else {
//主流
                            out.collect(value);
                        }
                    }
                });
        ds1.print("主流，非 s1,s2 的传感器");
        SideOutputDataStream<WaterSensor> s1DS =
                ds1.getSideOutput(s1);
        SideOutputDataStream<WaterSensor> s2DS =
                ds1.getSideOutput(s2);
        s1DS.printToErr("s1");
        s2DS.printToErr("s2");
        env.execute();
    }
}